package cn.piflow.local

import java.util.Date
import java.util.concurrent.atomic.AtomicInteger

import cn.piflow._
import cn.piflow.util.Logging
import org.quartz._
import org.quartz.impl.StdSchedulerFactory
import org.quartz.impl.matchers.GroupMatcher

import scala.beans.BeanProperty
import scala.collection.JavaConversions.iterableAsScalaIterable

class FlowJobServiceImpl()
	extends JobService with StatService with Logging {
	val jobId = new AtomicInteger(0);
	val quartzScheduler = StdSchedulerFactory.getDefaultScheduler();
	val table: QuartzTriggerTable = new QuartzTriggerTableImpl();
	val stats = collection.mutable.Map[String, FlowJobStatImpl]();

	def start() = {
		quartzScheduler.getListenerManager.addSchedulerListener(new QuartzSchedulerListenerImpl(table));
		quartzScheduler.getListenerManager.addJobListener(new QuartzJobListenerImpl());
		quartzScheduler.getListenerManager.addTriggerListener(new QuartzTriggerListenerImpl(table));
		quartzScheduler.start();
	}

	def shutdown() = {
		quartzScheduler.shutdown();
	}

	def run(flowGraph: FlowGraph, timeout: Long): ScheduledJob = {
		val t3 = scheduleInternal(flowGraph, new JobSchedule());
		val name = t3._3.getId();

		val key = t3._2.getKey;
		table.get(name).awaitFinalized(if (timeout > 0) {
			timeout
		} else {
			0
		});

		if (timeout > 0 && quartzScheduler.getTrigger(key) != null)
			quartzScheduler.unscheduleJob(key);

		t3._3;
	}

	override def schedule(flowGraph: FlowGraph, schedule: JobSchedule): ScheduledJob =
		scheduleInternal(flowGraph, schedule)._3;

	def scheduleInternal(flowGraph: FlowGraph, schedule: JobSchedule): (JobDetail, Trigger, ScheduledJob) = {
		//validation
		validate(flowGraph);

		val jobDetail =
			JobBuilder.newJob(classOf[FlowGraphJob])
				.build();

		//TODO: persist to recovery
		jobDetail.getJobDataMap.put("flowGraph", flowGraph);
		jobDetail.getJobDataMap.put("jobService", this);

		val triggerId = "" + jobId.incrementAndGet();
		val triggerBuilder = TriggerBuilder
			.newTrigger()
			.withIdentity(triggerId, classOf[FlowGraph].getName);

		if (schedule.scheduleBuilder.isDefined)
			triggerBuilder.withSchedule(schedule.scheduleBuilder.get);

		if (schedule.startTime.isDefined)
			triggerBuilder.startAt(schedule.startTime.get);
		else
			triggerBuilder.startNow();

		val trigger = triggerBuilder.build();
		table.login(trigger);

		quartzScheduler.scheduleJob(jobDetail, trigger);
		(jobDetail, trigger, new PojoFlowScheduledJob(jobDetail, trigger));
	}

	private def validate(flowGraph: FlowGraph) {
		//ports
		//no-loop
	}

	def getStat(jobId: String): FlowJobStat = {
		stats.getOrElse(jobId, new FlowJobStatImpl(jobId));
	}

	def receive(jobId: String, event: FlowEvent) = {
		stats.getOrElseUpdate(jobId, new FlowJobStatImpl(jobId)).receive(event);
	}

	def getFireCount(jobId: String): Int = {
		table.get(jobId).getFireCount();
	}

	def exists(jobId: String): Boolean = {
		quartzScheduler.getTrigger(jobId2TriggerKey(jobId)) != null;
	}

	private def jobId2TriggerKey(jobId: String) =
		new TriggerKey(jobId, classOf[FlowGraph].getName);

	def getHistoricExecutions(jobId: String) = {
		table.getHistoricExecutions().filter(_.getTrigger.getKey.getName.equals(jobId)).map {
			new PojoFlowJobInstance(_);
		}
	}

	def getHistoricExecutions() = {
		table.getHistoricExecutions().map {
			new PojoFlowJobInstance(_);
		}
	}

	def getScheduledJobs(): Seq[ScheduledJob] = {
		quartzScheduler.getTriggerKeys(GroupMatcher.groupEquals(classOf[FlowGraph].getName)).map { tk: TriggerKey ⇒
			val trigger = quartzScheduler.getTrigger(tk);
			val jobDetail = quartzScheduler.getJobDetail(trigger.getJobKey);
			new PojoFlowScheduledJob(jobDetail, trigger);
		}.toSeq
	}

	def getRunningJobs(jobId: String): Seq[JobInstance] = {
		getRunningJobs().filter { job ⇒
			jobId.eq(job.getScheduledJob().getId())
		}
	}

	def getRunningJobs(): Seq[JobInstance] = {
		quartzScheduler.getCurrentlyExecutingJobs.map { ctx ⇒
			new PojoFlowJobInstance(ctx);
		}.toSeq
	}

	def resume(jobId: String) = {
		quartzScheduler.resumeTrigger(jobId2TriggerKey(jobId));
	}

	def pause(jobId: String) = {
		quartzScheduler.pauseTrigger(jobId2TriggerKey(jobId));
	}

	def stop(jobId: String) = {
		quartzScheduler.unscheduleJob(jobId2TriggerKey(jobId));
	}
}

case class PojoFlowJobInstance(@BeanProperty id: String,
                               @BeanProperty scheduledJob: PojoFlowScheduledJob,
                               @BeanProperty startTime: Date,
                               @BeanProperty runTime: Long,
                               @BeanProperty refireCount: Int) extends JobInstance {
	def this(ctx: JobExecutionContext) = this(ctx.getFireInstanceId,
		new PojoFlowScheduledJob(ctx.getJobDetail, ctx.getTrigger),
		ctx.getFireTime,
		ctx.getJobRunTime,
		ctx.getRefireCount);
}

case class PojoFlowScheduledJob(@BeanProperty id: String,
                                @BeanProperty nextFireTime: Date,
                                @BeanProperty startTime: Date,
                                @BeanProperty endTime: Date,
                                @BeanProperty previousFireTime: Date) extends ScheduledJob {
	def this(detail: JobDetail, trigger: Trigger) =
		this(trigger.getKey.getName,
			trigger.getNextFireTime,
			trigger.getStartTime,
			trigger.getStartTime,
			trigger.getPreviousFireTime);
}